/**
Copyright (c) 2020, Nimblex Co .,Ltd.
Created on 2020-12-10 11:59
**/

package main

import (
	"context"
	"flag"
	"fmt"
	"github.com/go-pg/pg/v10"
	"github.com/go-pg/pg/v10/orm"
	"strings"
	"sync"
)

var (
	addr   = flag.String("addr", "192.168.80.161:5433", "memfire address to connect")
	user   = flag.String("user", "test", "memfire user")
	passwd = flag.String("passwd", "test", "memfire password")
	dbname = flag.String("db", "dbname", "memfire database name to connect")
)

func panicIf(err error) {
	if err != nil {
		panic(err)
	}
}

func test_transaction_try_again(db *pg.DB) {
	incrInTx := func(db *pg.DB) error {
		// Transaction is automatically rolled back on error.
		return db.RunInTransaction(db.Context(), func(tx *pg.Tx) error {
			var counter int
			_, err := tx.QueryOne(
				pg.Scan(&counter), `SELECT counter FROM counters FOR UPDATE`)
			if err != nil {
				return err
			}

			counter++

			_, err = tx.Exec(`UPDATE counters SET counter = ?`, counter)
			return err
		})
	}

	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for {
				err := incrInTx(db)
				if err != nil {
					if strings.Contains(err.Error(), "40001") ||
						strings.Contains(err.Error(), "Try again") ||
						strings.Contains(err.Error(), "Restart read required") {
						fmt.Println("Try again")
						continue
					}
					panic(err)
				}
				break
			}

		}()
	}
	wg.Wait()
}

type Counter struct {
	Counter int64
}

type User struct {
	Id     int64
	Name   string
	Emails []string
}

// createSchema creates database schema for Counter/ User
func createSchema(db *pg.DB) error {
	models := []interface{}{
		(*Counter)(nil),
		(*User)(nil),
	}

	for _, model := range models {
		err := db.Model(model).CreateTable(&orm.CreateTableOptions{
			Temp:        false,
			IfNotExists: true,
		})
		if err != nil {
			return err
		}
	}
	return nil
}

func main() {
	flag.Parse()

	opt := pg.Options{
		Addr:     *addr,
		User:     *user,
		Password: *passwd,
		Database: *dbname,
		OnConnect: func(ctx context.Context, cn *pg.Conn) error {
			println("new connection created")
			return nil
		},
	}

	db := pg.Connect(&opt)
	defer db.Close()

	err := createSchema(db)
	panicIf(err)

	_, err = db.Exec("delete from counters")
	panicIf(err)

	cnt := &Counter{
		Counter: 1,
	}
	_, err = db.Model(cnt).Insert()
	panicIf(err)

	test_transaction_try_again(db)
}
